From bb33743d43018a9c2c0bfa0c2b694b7faf673633 Mon Sep 17 00:00:00 2001 From: "akw27@arcadians.cl.cam.ac.uk" Date: Tue, 7 Jun 2005 15:26:07 +0000 Subject: [PATCH] bitkeeper revision 1.1686.1.1 (42a5bc8fhkR_9WfuD9N-je5TT27yDw) Parallax code cleanups. Signed-off-by: andrew.warfield@cl.cam.ac.uk --- tools/blktap/block-async.c | 479 +++++++++++++++++----------------- tools/blktap/block-async.h | 40 +-- tools/blktap/parallax.c | 107 ++++---- tools/blktap/requests-async.c | 139 +++++----- 4 files changed, 374 insertions(+), 391 deletions(-) diff --git a/tools/blktap/block-async.c b/tools/blktap/block-async.c index 6f26071ade..a0460de6fc 100755 --- a/tools/blktap/block-async.c +++ b/tools/blktap/block-async.c @@ -31,47 +31,47 @@ */ struct read_args { - u64 addr; + u64 addr; }; struct write_args { - u64 addr; - char *block; + u64 addr; + char *block; }; struct alloc_args { - char *block; + char *block; }; struct pending_io_req { - enum {IO_READ, IO_WRITE, IO_ALLOC, IO_RWAKE, IO_WWAKE} op; - union { - struct read_args r; - struct write_args w; - struct alloc_args a; - } u; - io_cb_t cb; - void *param; + enum {IO_READ, IO_WRITE, IO_ALLOC, IO_RWAKE, IO_WWAKE} op; + union { + struct read_args r; + struct write_args w; + struct alloc_args a; + } u; + io_cb_t cb; + void *param; }; void radix_lock_init(struct radix_lock *r) { - int i; - - pthread_mutex_init(&r->lock, NULL); - for (i=0; i < 1024; i++) { - r->lines[i] = 0; - r->waiters[i] = NULL; - r->state[i] = ANY; - } + int i; + + pthread_mutex_init(&r->lock, NULL); + for (i=0; i < 1024; i++) { + r->lines[i] = 0; + r->waiters[i] = NULL; + r->state[i] = ANY; + } } /* maximum outstanding I/O requests issued asynchronously */ /* must be a power of 2.*/ -#define MAX_PENDING_IO 1024 //1024 +#define MAX_PENDING_IO 1024 /* how many threads to concurrently issue I/O to the disk. */ -#define IO_POOL_SIZE 10 //10 +#define IO_POOL_SIZE 10 static struct pending_io_req pending_io_reqs[MAX_PENDING_IO]; static int pending_io_list[MAX_PENDING_IO]; @@ -87,276 +87,268 @@ static pthread_cond_t pending_io_cond = PTHREAD_COND_INITIALIZER; static void init_pending_io(void) { - int i; + int i; - for (i=0; iop = IO_READ; - req->u.r.addr = addr; - req->cb = cb; - req->param = param; - + struct pending_io_req *req; + + pthread_mutex_lock(&pending_io_lock); + assert(CAN_PRODUCE_PENDING_IO); + + req = PENDING_IO_ENT(io_prod++); + DPRINTF("Produce (R) %lu (%p)\n", io_prod - 1, req); + req->op = IO_READ; + req->u.r.addr = addr; + req->cb = cb; + req->param = param; + pthread_cond_signal(&pending_io_cond); - pthread_mutex_unlock(&pending_io_lock); + pthread_mutex_unlock(&pending_io_lock); } void block_write(u64 addr, char *block, io_cb_t cb, void *param) { - struct pending_io_req *req; - - pthread_mutex_lock(&pending_io_lock); - assert(CAN_PRODUCE_PENDING_IO); - - req = PENDING_IO_ENT(io_prod++); - DPRINTF("Produce (W) %lu (%p)\n", io_prod - 1, req); - req->op = IO_WRITE; - req->u.w.addr = addr; - req->u.w.block = block; - req->cb = cb; - req->param = param; - + struct pending_io_req *req; + + pthread_mutex_lock(&pending_io_lock); + assert(CAN_PRODUCE_PENDING_IO); + + req = PENDING_IO_ENT(io_prod++); + DPRINTF("Produce (W) %lu (%p)\n", io_prod - 1, req); + req->op = IO_WRITE; + req->u.w.addr = addr; + req->u.w.block = block; + req->cb = cb; + req->param = param; + pthread_cond_signal(&pending_io_cond); - pthread_mutex_unlock(&pending_io_lock); + pthread_mutex_unlock(&pending_io_lock); } void block_alloc(char *block, io_cb_t cb, void *param) { - struct pending_io_req *req; - - pthread_mutex_lock(&pending_io_lock); - assert(CAN_PRODUCE_PENDING_IO); - - req = PENDING_IO_ENT(io_prod++); - req->op = IO_ALLOC; - req->u.a.block = block; - req->cb = cb; - req->param = param; + struct pending_io_req *req; + pthread_mutex_lock(&pending_io_lock); + assert(CAN_PRODUCE_PENDING_IO); + + req = PENDING_IO_ENT(io_prod++); + req->op = IO_ALLOC; + req->u.a.block = block; + req->cb = cb; + req->param = param; + pthread_cond_signal(&pending_io_cond); - pthread_mutex_unlock(&pending_io_lock); + pthread_mutex_unlock(&pending_io_lock); } void block_rlock(struct radix_lock *r, int row, io_cb_t cb, void *param) { - struct io_ret ret; - pthread_mutex_lock(&r->lock); - - if (( r->lines[row] >= 0 ) && (r->state[row] != STOP)) { - r->lines[row]++; - r->state[row] = READ; - DPRINTF("RLOCK : %3d (row: %d)\n", r->lines[row], row); - pthread_mutex_unlock(&r->lock); - ret.type = IO_INT_T; - ret.u.i = 0; - cb(ret, param); - } else { - struct radix_wait **rwc; - struct radix_wait *rw = - (struct radix_wait *) malloc (sizeof(struct radix_wait)); - DPRINTF("RLOCK : %3d (row: %d) -- DEFERRED!\n", r->lines[row], row); - rw->type = RLOCK; - rw->param = param; - rw->cb = cb; - rw->next = NULL; - /* append to waiters list. */ - rwc = &r->waiters[row]; - while (*rwc != NULL) rwc = &(*rwc)->next; - *rwc = rw; - pthread_mutex_unlock(&r->lock); - return; - } + struct io_ret ret; + pthread_mutex_lock(&r->lock); + + if (( r->lines[row] >= 0 ) && (r->state[row] != STOP)) { + r->lines[row]++; + r->state[row] = READ; + DPRINTF("RLOCK : %3d (row: %d)\n", r->lines[row], row); + pthread_mutex_unlock(&r->lock); + ret.type = IO_INT_T; + ret.u.i = 0; + cb(ret, param); + } else { + struct radix_wait **rwc; + struct radix_wait *rw = + (struct radix_wait *) malloc (sizeof(struct radix_wait)); + DPRINTF("RLOCK : %3d (row: %d) -- DEFERRED!\n", r->lines[row], row); + rw->type = RLOCK; + rw->param = param; + rw->cb = cb; + rw->next = NULL; + /* append to waiters list. */ + rwc = &r->waiters[row]; + while (*rwc != NULL) rwc = &(*rwc)->next; + *rwc = rw; + pthread_mutex_unlock(&r->lock); + return; + } } void block_wlock(struct radix_lock *r, int row, io_cb_t cb, void *param) { - struct io_ret ret; - pthread_mutex_lock(&r->lock); - - /* the second check here is redundant -- just here for debugging now. */ - if ((r->state[row] == ANY) && ( r->lines[row] == 0 )) { - r->state[row] = STOP; - r->lines[row] = -1; - DPRINTF("WLOCK : %3d (row: %d)\n", r->lines[row], row); - pthread_mutex_unlock(&r->lock); - ret.type = IO_INT_T; - ret.u.i = 0; - cb(ret, param); - } else { - struct radix_wait **rwc; - struct radix_wait *rw = - (struct radix_wait *) malloc (sizeof(struct radix_wait)); - DPRINTF("WLOCK : %3d (row: %d) -- DEFERRED!\n", r->lines[row], row); - rw->type = WLOCK; - rw->param = param; - rw->cb = cb; - rw->next = NULL; - /* append to waiters list. */ - rwc = &r->waiters[row]; - while (*rwc != NULL) rwc = &(*rwc)->next; - *rwc = rw; - pthread_mutex_unlock(&r->lock); - return; - } + struct io_ret ret; + pthread_mutex_lock(&r->lock); + + /* the second check here is redundant -- just here for debugging now. */ + if ((r->state[row] == ANY) && ( r->lines[row] == 0 )) { + r->state[row] = STOP; + r->lines[row] = -1; + DPRINTF("WLOCK : %3d (row: %d)\n", r->lines[row], row); + pthread_mutex_unlock(&r->lock); + ret.type = IO_INT_T; + ret.u.i = 0; + cb(ret, param); + } else { + struct radix_wait **rwc; + struct radix_wait *rw = + (struct radix_wait *) malloc (sizeof(struct radix_wait)); + DPRINTF("WLOCK : %3d (row: %d) -- DEFERRED!\n", r->lines[row], row); + rw->type = WLOCK; + rw->param = param; + rw->cb = cb; + rw->next = NULL; + /* append to waiters list. */ + rwc = &r->waiters[row]; + while (*rwc != NULL) rwc = &(*rwc)->next; + *rwc = rw; + pthread_mutex_unlock(&r->lock); + return; + } } /* called with radix_lock locked and lock count of zero. */ static void wake_waiters(struct radix_lock *r, int row) { - struct pending_io_req *req; - struct radix_wait *rw; - - DPRINTF("prewake\n"); - if (r->lines[row] != 0) return; - if (r->waiters[row] == NULL) {DPRINTF("nowaiters!\n");return;} - - DPRINTF("wake\n"); - if (r->waiters[row]->type == WLOCK) { - rw = r->waiters[row]; - pthread_mutex_lock(&pending_io_lock); - assert(CAN_PRODUCE_PENDING_IO); - - req = PENDING_IO_ENT(io_prod++); - DPRINTF("Produce (WWAKE) %lu (%p)\n", io_prod - 1, req); - req->op = IO_WWAKE; - req->cb = rw->cb; - req->param = rw->param; - r->lines[row] = -1; /* write lock the row. */ - r->state[row] = STOP; - r->waiters[row] = rw->next; - free(rw); - pthread_mutex_unlock(&pending_io_lock); - } else /* RLOCK */ { - while ((r->waiters[row] != NULL) && (r->waiters[row]->type == RLOCK)) { - rw = r->waiters[row]; - pthread_mutex_lock(&pending_io_lock); - assert(CAN_PRODUCE_PENDING_IO); - - req = PENDING_IO_ENT(io_prod++); - DPRINTF("Produce (RWAKE) %lu (%p)\n", io_prod - 1, req); - req->op = IO_RWAKE; - req->cb = rw->cb; - req->param = rw->param; - r->lines[row]++; /* read lock the row. */ - r->state[row] = READ; - r->waiters[row] = rw->next; - free(rw); - pthread_mutex_unlock(&pending_io_lock); - } - if (r->waiters[row] != NULL) /* There is a write queued still */ - r->state[row] = STOP; - } - - DPRINTF("wakedone\n"); - DPRINTF("prod: %lu cons: %lu free: %lu\n", io_prod, io_cons, io_free); - pthread_mutex_lock(&pending_io_lock); + struct pending_io_req *req; + struct radix_wait *rw; + + if (r->lines[row] != 0) return; + if (r->waiters[row] == NULL) return; + + if (r->waiters[row]->type == WLOCK) { + + rw = r->waiters[row]; + pthread_mutex_lock(&pending_io_lock); + assert(CAN_PRODUCE_PENDING_IO); + + req = PENDING_IO_ENT(io_prod++); + req->op = IO_WWAKE; + req->cb = rw->cb; + req->param = rw->param; + r->lines[row] = -1; /* write lock the row. */ + r->state[row] = STOP; + r->waiters[row] = rw->next; + free(rw); + pthread_mutex_unlock(&pending_io_lock); + + } else /* RLOCK */ { + + while ((r->waiters[row] != NULL) && (r->waiters[row]->type == RLOCK)) { + rw = r->waiters[row]; + pthread_mutex_lock(&pending_io_lock); + assert(CAN_PRODUCE_PENDING_IO); + + req = PENDING_IO_ENT(io_prod++); + req->op = IO_RWAKE; + req->cb = rw->cb; + req->param = rw->param; + r->lines[row]++; /* read lock the row. */ + r->state[row] = READ; + r->waiters[row] = rw->next; + free(rw); + pthread_mutex_unlock(&pending_io_lock); + } + + if (r->waiters[row] != NULL) /* There is a write queued still */ + r->state[row] = STOP; + } + + pthread_mutex_lock(&pending_io_lock); pthread_cond_signal(&pending_io_cond); - pthread_mutex_unlock(&pending_io_lock); + pthread_mutex_unlock(&pending_io_lock); } void block_runlock(struct radix_lock *r, int row, io_cb_t cb, void *param) { - struct io_ret ret; + struct io_ret ret; - pthread_mutex_lock(&r->lock); - assert(r->lines[row] > 0); /* try to catch misuse. */ - r->lines[row]--; - DPRINTF("RUNLOCK: %3d (row: %d)\n", r->lines[row], row); - if (r->lines[row] == 0) { - r->state[row] = ANY; - wake_waiters(r, row); - } - pthread_mutex_unlock(&r->lock); - cb(ret, param); + pthread_mutex_lock(&r->lock); + assert(r->lines[row] > 0); /* try to catch misuse. */ + r->lines[row]--; + if (r->lines[row] == 0) { + r->state[row] = ANY; + wake_waiters(r, row); + } + pthread_mutex_unlock(&r->lock); + cb(ret, param); } void block_wunlock(struct radix_lock *r, int row, io_cb_t cb, void *param) { - struct io_ret ret; - - pthread_mutex_lock(&r->lock); - assert(r->lines[row] == -1); /* try to catch misuse. */ - r->lines[row] = 0; - r->state[row] = ANY; - DPRINTF("WUNLOCK: %3d (row: %d)\n", r->lines[row], row); - wake_waiters(r, row); - pthread_mutex_unlock(&r->lock); - cb(ret, param); + struct io_ret ret; + + pthread_mutex_lock(&r->lock); + assert(r->lines[row] == -1); /* try to catch misuse. */ + r->lines[row] = 0; + r->state[row] = ANY; + wake_waiters(r, row); + pthread_mutex_unlock(&r->lock); + cb(ret, param); } /* consumer calls */ static void do_next_io_req(struct pending_io_req *req) { - struct io_ret ret; - void *param; - - switch (req->op) { - case IO_READ: - ret.type = IO_BLOCK_T; - ret.u.b = readblock(req->u.r.addr); - break; - case IO_WRITE: - ret.type = IO_INT_T; - ret.u.i = writeblock(req->u.w.addr, req->u.w.block); - DPRINTF("wrote %d at %Lu\n", *(int *)(req->u.w.block), req->u.w.addr); - break; - case IO_ALLOC: - ret.type = IO_ADDR_T; - ret.u.a = allocblock(req->u.a.block); - break; - case IO_RWAKE: - DPRINTF("WAKE DEFERRED RLOCK!\n"); - ret.type = IO_INT_T; - ret.u.i = 0; - break; - case IO_WWAKE: - DPRINTF("WAKE DEFERRED WLOCK!\n"); - ret.type = IO_INT_T; - ret.u.i = 0; - break; - default: - DPRINTF("Unknown IO operation on pending list!\n"); - return; - } + struct io_ret ret; + void *param; + + switch (req->op) { + case IO_READ: + ret.type = IO_BLOCK_T; + ret.u.b = readblock(req->u.r.addr); + break; + case IO_WRITE: + ret.type = IO_INT_T; + ret.u.i = writeblock(req->u.w.addr, req->u.w.block); + DPRINTF("wrote %d at %Lu\n", *(int *)(req->u.w.block), req->u.w.addr); + break; + case IO_ALLOC: + ret.type = IO_ADDR_T; + ret.u.a = allocblock(req->u.a.block); + break; + case IO_RWAKE: + DPRINTF("WAKE DEFERRED RLOCK!\n"); + ret.type = IO_INT_T; + ret.u.i = 0; + break; + case IO_WWAKE: + DPRINTF("WAKE DEFERRED WLOCK!\n"); + ret.type = IO_INT_T; + ret.u.i = 0; + break; + default: + DPRINTF("Unknown IO operation on pending list!\n"); + return; + } + + param = req->param; + pthread_mutex_lock(&pending_io_lock); + pending_io_list[PENDING_IO_MASK(io_free++)] = PENDING_IO_IDX(req); + pthread_mutex_unlock(&pending_io_lock); - param = req->param; - DPRINTF("freeing idx %d to slot %lu.\n", PENDING_IO_IDX(req), PENDING_IO_MASK(io_free)); - pthread_mutex_lock(&pending_io_lock); - pending_io_list[PENDING_IO_MASK(io_free++)] = PENDING_IO_IDX(req); - DPRINTF(" : prod: %lu cons: %lu free: %lu\n", io_prod, io_cons, io_free); - pthread_mutex_unlock(&pending_io_lock); - - assert(req->cb != NULL); - req->cb(ret, param); - + assert(req->cb != NULL); + req->cb(ret, param); + } void *io_thread(void *param) { - int tid; - struct pending_io_req *req; - - /* Set this thread's tid. */ + int tid; + struct pending_io_req *req; + + /* Set this thread's tid. */ tid = *(int *)param; free(param); - DPRINTF("IOT %2d started.\n", tid); - start: pthread_mutex_lock(&pending_io_lock); while (io_prod == io_cons) { @@ -369,15 +361,12 @@ start: goto start; } - req = PENDING_IO_ENT(io_cons++); - DPRINTF("IOT %2d has req %04d(%p).\n", tid, PENDING_IO_IDX(req), req); - DPRINTF(" : prod: %lu cons: %lu free: %lu\n", io_prod, io_cons, io_free); - pthread_mutex_unlock(&pending_io_lock); - + req = PENDING_IO_ENT(io_cons++); + pthread_mutex_unlock(&pending_io_lock); do_next_io_req(req); - goto start; + goto start; } @@ -385,9 +374,9 @@ static pthread_t io_pool[IO_POOL_SIZE]; void start_io_threads(void) { - int i, tid=0; - - for (i=0; i < IO_POOL_SIZE; i++) { + int i, tid=0; + + for (i=0; i < IO_POOL_SIZE; i++) { int ret, *t; t = (int *)malloc(sizeof(int)); *t = tid++; @@ -399,6 +388,6 @@ void start_io_threads(void) void init_block_async(void) { - init_pending_io(); - start_io_threads(); + init_pending_io(); + start_io_threads(); } diff --git a/tools/blktap/block-async.h b/tools/blktap/block-async.h index b19d464a52..022eea5da1 100755 --- a/tools/blktap/block-async.h +++ b/tools/blktap/block-async.h @@ -12,29 +12,29 @@ struct io_ret { - enum {IO_ADDR_T, IO_BLOCK_T, IO_INT_T} type; - union { - u64 a; - char *b; - int i; - } u; + enum {IO_ADDR_T, IO_BLOCK_T, IO_INT_T} type; + union { + u64 a; + char *b; + int i; + } u; }; typedef void (*io_cb_t)(struct io_ret r, void *param); /* per-vdi lock structures to make sure requests run in a safe order. */ struct radix_wait { - enum {RLOCK, WLOCK} type; - io_cb_t cb; - void *param; - struct radix_wait *next; + enum {RLOCK, WLOCK} type; + io_cb_t cb; + void *param; + struct radix_wait *next; }; struct radix_lock { - pthread_mutex_t lock; - int lines[1024]; - struct radix_wait *waiters[1024]; - enum {ANY, READ, STOP} state[1024]; + pthread_mutex_t lock; + int lines[1024]; + struct radix_wait *waiters[1024]; + enum {ANY, READ, STOP} state[1024]; }; void radix_lock_init(struct radix_lock *r); @@ -49,20 +49,20 @@ void init_block_async(void); static inline u64 IO_ADDR(struct io_ret r) { - assert(r.type == IO_ADDR_T); - return r.u.a; + assert(r.type == IO_ADDR_T); + return r.u.a; } static inline char *IO_BLOCK(struct io_ret r) { - assert(r.type == IO_BLOCK_T); - return r.u.b; + assert(r.type == IO_BLOCK_T); + return r.u.b; } static inline int IO_INT(struct io_ret r) { - assert(r.type == IO_INT_T); - return r.u.i; + assert(r.type == IO_INT_T); + return r.u.i; } diff --git a/tools/blktap/parallax.c b/tools/blktap/parallax.c index 50d282cf0a..3f59834f12 100644 --- a/tools/blktap/parallax.c +++ b/tools/blktap/parallax.c @@ -328,33 +328,33 @@ typedef struct { pending_t pending_list[MAX_REQUESTS]; struct cb_param { - pending_t *pent; - int segment; - u64 sector; - u64 vblock; /* for debug printing -- can be removed. */ + pending_t *pent; + int segment; + u64 sector; + u64 vblock; /* for debug printing -- can be removed. */ }; static void read_cb(struct io_ret r, void *in_param) { - struct cb_param *param = (struct cb_param *)in_param; - pending_t *p = param->pent; - int segment = param->segment; - blkif_request_t *req = p->req; + struct cb_param *param = (struct cb_param *)in_param; + pending_t *p = param->pent; + int segment = param->segment; + blkif_request_t *req = p->req; unsigned long size, offset, start; - char *dpage, *spage; + char *dpage, *spage; - spage = IO_BLOCK(r); - if (spage == NULL) { p->error++; goto finish; } - dpage = (char *)MMAP_VADDR(ID_TO_IDX(req->id), segment); + spage = IO_BLOCK(r); + if (spage == NULL) { p->error++; goto finish; } + dpage = (char *)MMAP_VADDR(ID_TO_IDX(req->id), segment); /* Calculate read size and offset within the read block. */ offset = (param->sector << SECTOR_SHIFT) % BLOCK_SIZE; size = ( blkif_last_sect (req->frame_and_sects[segment]) - blkif_first_sect(req->frame_and_sects[segment]) + 1 - ) << SECTOR_SHIFT; + ) << SECTOR_SHIFT; start = blkif_first_sect(req->frame_and_sects[segment]) - << SECTOR_SHIFT; + << SECTOR_SHIFT; DPRINTF("ParallaxRead: sect: %lld (%ld,%ld), " "vblock %llx, " @@ -371,23 +371,23 @@ static void read_cb(struct io_ret r, void *in_param) pthread_mutex_lock(&p->mutex); p->count--; - if (p->count == 0) { + if (p->count == 0) { blkif_response_t *rsp; rsp = (blkif_response_t *)req; rsp->id = req->id; rsp->operation = BLKIF_OP_READ; if (p->error == 0) { - rsp->status = BLKIF_RSP_OKAY; + rsp->status = BLKIF_RSP_OKAY; } else { - rsp->status = BLKIF_RSP_ERROR; + rsp->status = BLKIF_RSP_ERROR; } blktap_inject_response(rsp); } pthread_mutex_unlock(&p->mutex); - free(param); /* TODO: replace with cached alloc/dealloc */ + free(param); /* TODO: replace with cached alloc/dealloc */ } int parallax_read(blkif_request_t *req, blkif_t *blkif) @@ -414,21 +414,20 @@ int parallax_read(blkif_request_t *req, blkif_t *blkif) pthread_t tid; int ret; struct cb_param *p; - - /* Round the requested segment to a block address. */ - sector = req->sector_number + (8*i); - vblock = (sector << SECTOR_SHIFT) >> BLOCK_SHIFT; - - /* TODO: Replace this call to malloc with a cached allocation */ - p = (struct cb_param *)malloc(sizeof(struct cb_param)); - p->pent = pent; - p->sector = sector; - p->segment = i; - p->vblock = vblock; /* dbg */ - - /* Get that block from the store. */ - async_read(vdi, vblock, read_cb, (void *)p); - + + /* Round the requested segment to a block address. */ + sector = req->sector_number + (8*i); + vblock = (sector << SECTOR_SHIFT) >> BLOCK_SHIFT; + + /* TODO: Replace this call to malloc with a cached allocation */ + p = (struct cb_param *)malloc(sizeof(struct cb_param)); + p->pent = pent; + p->sector = sector; + p->segment = i; + p->vblock = vblock; /* dbg */ + + /* Get that block from the store. */ + async_read(vdi, vblock, read_cb, (void *)p); } return BLKTAP_STOLEN; @@ -444,33 +443,33 @@ err: static void write_cb(struct io_ret r, void *in_param) { - struct cb_param *param = (struct cb_param *)in_param; - pending_t *p = param->pent; - blkif_request_t *req = p->req; - - /* catch errors from the block code. */ - if (IO_INT(r) < 0) p->error++; - + struct cb_param *param = (struct cb_param *)in_param; + pending_t *p = param->pent; + blkif_request_t *req = p->req; + + /* catch errors from the block code. */ + if (IO_INT(r) < 0) p->error++; + pthread_mutex_lock(&p->mutex); p->count--; - if (p->count == 0) { + if (p->count == 0) { blkif_response_t *rsp; rsp = (blkif_response_t *)req; rsp->id = req->id; rsp->operation = BLKIF_OP_WRITE; if (p->error == 0) { - rsp->status = BLKIF_RSP_OKAY; + rsp->status = BLKIF_RSP_OKAY; } else { - rsp->status = BLKIF_RSP_ERROR; + rsp->status = BLKIF_RSP_ERROR; } blktap_inject_response(rsp); } pthread_mutex_unlock(&p->mutex); - free(param); /* TODO: replace with cached alloc/dealloc */ + free(param); /* TODO: replace with cached alloc/dealloc */ } int parallax_write(blkif_request_t *req, blkif_t *blkif) @@ -496,7 +495,7 @@ int parallax_write(blkif_request_t *req, blkif_t *blkif) for (i = 0; i < req->nr_segments; i++) { struct cb_param *p; - + spage = (char *)MMAP_VADDR(ID_TO_IDX(req->id), i); /* Round the requested segment to a block address. */ @@ -509,7 +508,7 @@ int parallax_write(blkif_request_t *req, blkif_t *blkif) offset = (sector << SECTOR_SHIFT) % BLOCK_SIZE; size = ( blkif_last_sect (req->frame_and_sects[i]) - blkif_first_sect(req->frame_and_sects[i]) + 1 - ) << SECTOR_SHIFT; + ) << SECTOR_SHIFT; start = blkif_first_sect(req->frame_and_sects[i]) << SECTOR_SHIFT; DPRINTF("ParallaxWrite: sect: %lld (%ld,%ld), " @@ -527,15 +526,15 @@ int parallax_write(blkif_request_t *req, blkif_t *blkif) goto err; } - /* TODO: Replace this call to malloc with a cached allocation */ - p = (struct cb_param *)malloc(sizeof(struct cb_param)); - p->pent = pent; - p->sector = sector; - p->segment = i; - p->vblock = vblock; /* dbg */ - + /* TODO: Replace this call to malloc with a cached allocation */ + p = (struct cb_param *)malloc(sizeof(struct cb_param)); + p->pent = pent; + p->sector = sector; + p->segment = i; + p->vblock = vblock; /* dbg */ + /* Issue the write to the store. */ - async_write(vdi, vblock, spage, write_cb, (void *)p); + async_write(vdi, vblock, spage, write_cb, (void *)p); } return BLKTAP_STOLEN; diff --git a/tools/blktap/requests-async.c b/tools/blktap/requests-async.c index bb2d07b60a..f68ae76db4 100755 --- a/tools/blktap/requests-async.c +++ b/tools/blktap/requests-async.c @@ -1,6 +1,6 @@ -/* read.c +/* requests-async.c * - * asynchronous read experiment for parallax. + * asynchronous request dispatcher for radix access in parallax. */ #include @@ -17,9 +17,6 @@ #define L3_IDX(_a) (((_a) & 0x00000000000001ffULL)) - -//#define STANDALONE - #if 0 #define DPRINTF(_f, _a...) printf ( _f , ## _a ) #else @@ -45,10 +42,10 @@ struct io_req { void clear_w_bits(radix_tree_node node) { - int i; - for (i=0; iradix[0] = req->radix[1] = req->radix[2] = NULL; + req->radix[0] = req->radix[1] = req->radix[2] = NULL; - if (req == NULL) {perror("req was NULL in async_read"); return(-1); } + if (req == NULL) {perror("req was NULL in async_read"); return(-1); } req->op = IO_OP_READ; req->root = vdi->radix_root; @@ -135,7 +132,7 @@ int async_read(vdi_t *vdi, u64 vaddr, io_cb_t cb, void *param) req->param = param; req->state = READ_LOCKED; - block_rlock(req->lock, L1_IDX(vaddr), read_cb, req); + block_rlock(req->lock, L1_IDX(vaddr), read_cb, req); return 0; } @@ -148,10 +145,9 @@ int async_write(vdi_t *vdi, u64 vaddr, char *block, req = (struct io_req *)malloc(sizeof (struct io_req)); - req->radix[0] = req->radix[1] = req->radix[2] = NULL; - //DPRINTF("async_write\n"); + req->radix[0] = req->radix[1] = req->radix[2] = NULL; - if (req == NULL) {perror("req was NULL in async_write"); return(-1); } + if (req == NULL) {perror("req was NULL in async_write"); return(-1); } req->op = IO_OP_WRITE; req->root = vdi->radix_root; @@ -163,10 +159,10 @@ int async_write(vdi_t *vdi, u64 vaddr, char *block, req->radix_addr[L1] = getid(req->root); /* for consistency */ req->state = WRITE_LOCKED; - block_wlock(req->lock, L1_IDX(vaddr), write_cb, req); + block_wlock(req->lock, L1_IDX(vaddr), write_cb, req); - return 0; + return 0; } void read_cb(struct io_ret ret, void *param) @@ -197,11 +193,11 @@ void read_cb(struct io_ret ret, void *param) idx = getid( node[L1_IDX(req->vaddr)] ); free(block); if ( idx == ZERO ) { - req->state = RETURN_ZERO; - block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req); + req->state = RETURN_ZERO; + block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req); } else { - req->state = READ_L2; - block_read(idx, read_cb, req); + req->state = READ_L2; + block_read(idx, read_cb, req); } break; @@ -214,11 +210,11 @@ void read_cb(struct io_ret ret, void *param) idx = getid( node[L2_IDX(req->vaddr)] ); free(block); if ( idx == ZERO ) { - req->state = RETURN_ZERO; - block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req); + req->state = RETURN_ZERO; + block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req); } else { - req->state = READ_L3; - block_read(idx, read_cb, req); + req->state = READ_L3; + block_read(idx, read_cb, req); } break; @@ -231,11 +227,11 @@ void read_cb(struct io_ret ret, void *param) idx = getid( node[L3_IDX(req->vaddr)] ); free(block); if ( idx == ZERO ) { - req->state = RETURN_ZERO; - block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req); + req->state = RETURN_ZERO; + block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req); } else { - req->state = READ_DATA; - block_read(idx, read_cb, req); + req->state = READ_DATA; + block_read(idx, read_cb, req); } break; @@ -249,9 +245,9 @@ void read_cb(struct io_ret ret, void *param) break; case READ_UNLOCKED: - { - struct io_ret r; - io_cb_t cb; + { + struct io_ret r; + io_cb_t cb; DPRINTF("READ_UNLOCKED\n"); req_param = req->param; r = req->retval; @@ -262,18 +258,18 @@ void read_cb(struct io_ret ret, void *param) } case RETURN_ZERO: - { - struct io_ret r; - io_cb_t cb; - DPRINTF("RETURN_ZERO\n"); - req_param = req->param; + { + struct io_ret r; + io_cb_t cb; + DPRINTF("RETURN_ZERO\n"); + req_param = req->param; cb = req->cb; - free(req); + free(req); r.type = IO_BLOCK_T; r.u.b = newblock(); - cb(r, req_param); - break; - } + cb(r, req_param); + break; + } default: DPRINTF("*** Write: Bad state! (%d) ***\n", req->state); @@ -283,16 +279,16 @@ void read_cb(struct io_ret ret, void *param) return; fail: - { - struct io_ret r; - io_cb_t cb; - DPRINTF("asyn_read had a read error.\n"); + { + struct io_ret r; + io_cb_t cb; + DPRINTF("asyn_read had a read error.\n"); req_param = req->param; r = ret; cb = req->cb; free(req); cb(r, req_param); - } + } } @@ -304,11 +300,10 @@ void write_cb(struct io_ret r, void *param) u64 a, addr; void *req_param; - //DPRINTF("write_cb\n"); switch(req->state) { case WRITE_LOCKED: - + DPRINTF("WRITE_LOCKED (%llu)\n", L1_IDX(req->vaddr)); req->state = READ_L1; block_read(getid(req->root), write_cb, req); @@ -326,9 +321,9 @@ void write_cb(struct io_ret r, void *param) req->radix[L1] = node; if ( addr == ZERO ) { - /* L1 empty subtree: */ - req->state = ALLOC_DATA_L1z; - block_alloc( req->block, write_cb, req ); + /* L1 empty subtree: */ + req->state = ALLOC_DATA_L1z; + block_alloc( req->block, write_cb, req ); } else if ( !iswritable(a) ) { /* L1 fault: */ req->state = READ_L2_L1f; @@ -351,7 +346,7 @@ void write_cb(struct io_ret r, void *param) req->radix[L2] = node; if ( addr == ZERO ) { - /* L2 empty subtree: */ + /* L2 empty subtree: */ req->state = ALLOC_DATA_L2z; block_alloc( req->block, write_cb, req ); } else if ( !iswritable(a) ) { @@ -447,7 +442,7 @@ void write_cb(struct io_ret r, void *param) addr = getid(a); req->radix[L3] = node; - req->state = ALLOC_DATA_L2f; + req->state = ALLOC_DATA_L2f; block_alloc( req->block, write_cb, req ); break; @@ -520,14 +515,14 @@ void write_cb(struct io_ret r, void *param) req->radix[L2] = node; if (addr == ZERO) { - /* nothing below L2, create an empty L3 and alloc data. */ - /* (So skip READ_L3_L1f.) */ - req->radix[L3] = newblock(); - req->state = ALLOC_DATA_L1f; - block_alloc( req->block, write_cb, req ); + /* nothing below L2, create an empty L3 and alloc data. */ + /* (So skip READ_L3_L1f.) */ + req->radix[L3] = newblock(); + req->state = ALLOC_DATA_L1f; + block_alloc( req->block, write_cb, req ); } else { - req->state = READ_L3_L1f; - block_read( addr, write_cb, req ); + req->state = READ_L3_L1f; + block_read( addr, write_cb, req ); } break; @@ -541,7 +536,7 @@ void write_cb(struct io_ret r, void *param) addr = getid(a); req->radix[L3] = node; - req->state = ALLOC_DATA_L1f; + req->state = ALLOC_DATA_L1f; block_alloc( req->block, write_cb, req ); break; @@ -587,7 +582,7 @@ void write_cb(struct io_ret r, void *param) DPRINTF("DONE\n"); /* free any saved node vals. */ for (i=0; i<3; i++) - if (req->radix[i] != 0) free(req->radix[i]); + if (req->radix[i] != 0) free(req->radix[i]); req->retval = r; req->state = WRITE_UNLOCKED; block_wunlock(req->lock, L1_IDX(req->vaddr), write_cb, req); @@ -601,7 +596,7 @@ void write_cb(struct io_ret r, void *param) req_param = req->param; r = req->retval; cb = req->cb; - free(req); + free(req); cb(r, req_param); break; } @@ -614,16 +609,16 @@ void write_cb(struct io_ret r, void *param) return; fail: - { - struct io_ret r; - io_cb_t cb; - DPRINTF("asyn_write had a read error mid-way.\n"); + { + struct io_ret r; + io_cb_t cb; + DPRINTF("asyn_write had a read error mid-way.\n"); req_param = req->param; cb = req->cb; r.type = IO_INT_T; r.u.i = -1; free(req); cb(r, req_param); - } + } } -- 2.30.2